-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-19753][CORE] Un-register all shuffle output on a host in case of slave lost or fetch failure #17088
Conversation
…case of fetch failure or slave lost
Test build #73533 has started for PR 17088 at commit |
fetch failure does not imply lost executor - it could be a transient issue. This is quite drastic for a fetch failure : spark already has mechanisms in place to detect executor/host failure - which take care of these failure modes. |
You are right, it could be transient, but we do have retries on the shuffle client to detect transient failure. In case driver receives a fetch failure, we always assume that the output is lost. The current model assumes the output is lost for a particular executor, which makes sense only if the shuffle service is disabled and the executors are serving the shuffle files themselves. However, in case the external shuffle service is enabled, a fetch failure means all output on that host should be marked unavailable. |
I agree with @mridulm, file fetch failure does not imply the executor down or all the executor of the host down. |
Unfortunately, mechanisms already in place are not sufficient. Let's imagine a situation where the shuffle service become unresponsive or OOMs, in that case, we will not see any host failure, still the driver will receive fetch failure. Current model assumes all shuffle output for an executor is lost, however, since the shuffle service serves all the shuffle files on that host, we should mark all the shuffle files on that host as unavailable. |
Even if I completely agreed that removing all of the shuffle files on a host was the correct design choice, I'd still be hesitant to merge this right now. That is simply because we have recently merged other changes related to fetch failure handling, and I'd really like to see some more time pass with just those changes in the code before we introduce more fetch failure changes. I don't want to get in the situation of merging this PR then getting reports of fetch failure bugs in master a week later, and not knowing whether to place the blame on this PR or the other recent fetch failure changes. That needn't preclude more discussion of this PR or possibly merging it after we have a little more experience and confidence with the code already in master. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm also hesitant on this change, but I see the argument for it.
So it seems like things should be correct either way -- before this change, it may just take spark longer to remove all the shuffle files it needs to. With this change, spark may remove more shuffle files than it needs to, but then it would regenerate them. Even with the old behavior, a transient issue could lead to spark removing (and regenerating) more data than it needs to. We should try to recover as quickly as possible in the more common case, though I'm not certain yet which way to lean.
Its also worth keeping in mind that we don't retry a stage indefinitely, we only give it 4 chances (SPARK-5945). One thing which is helping us currently is that we leave tasks running even after a fetch failure, which is likely to detect all the bad execs on the host. (which often means you need 2 retries, because you don't detect all the failures until after one retry has already started.) But if we were to ever immediately kill tasks after the first fetch failure, then we'd make it more likely that you'd go over 4 retries, just because there were 4 execs on one host. And even with those running tasks, its still relying on some amount of luck to ensure that you hit all those bad executors.
I don't know what the answer is yet, I'll keep thinking about and we should keep discussing.
@@ -1331,7 +1332,7 @@ class DAGScheduler( | |||
|
|||
// TODO: mark the executor as failed only if there were lots of fetch failures on it | |||
if (bmAddress != null) { | |||
handleExecutorLost(bmAddress.executorId, filesLost = true, Some(task.epoch)) | |||
handleExecutorLost(bmAddress.executorId, slaveLost = true, Some(task.epoch)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you could use bmAddress.host
, then you wouldn't need to store another execToHost
map (though it would require a little more refactoring)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure it's correct to assume that a FetchFailure means that all of the executors on the slave were lost. You could have a failure because one executor died, but the other executors on the host are OK, right? (UPDATED: I realized this is the same comment @mridulm made above)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kayousterhout - This change applies only when external shuffle service is enabled, in that case, a fetch failure would mean that the external shuffle service is unavailable, so we should remove all the output on that host, right? For case, when shuffle service is not enabled, this change should be a no-op.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@squito - Good point, will do.
Can you update the JIRA and PR description to say "un-register the output locations" (or similar) instead of "remove the files"? The current description is misleading since nothing is actually getting removed from the machine -- instead you're just updating the scheduler's state. Also, does the issue here only arise when the shuffle service is enabled? It seems like when the shuffle service is not being used, DAGScheduler.handleExecutorLost should be called for each lost executors (i.e., for all of the executors on a host, when a whole host is lost), which will correctly un-register all of the output on that executor. |
That is correct. For case, when shuffle service is not enabled, this change should be a no-op. |
Why is this a no-op when the shuffle service isn't enabled? It looks like you mark the slave as lost in all cases? |
@kayousterhout - You are right. It's kind of confusing that we are triggering a SlaveLost even in case of executor lost. |
Test build #73636 has finished for PR 17088 at commit
|
Test build #73638 has finished for PR 17088 at commit
|
Test build #73640 has finished for PR 17088 at commit
|
Jenkins retest this please. |
Test build #73664 has finished for PR 17088 at commit
|
Can you please file a JIRA for the flaky jenkins failure? |
+CC @tgravescs You might be interested in this given your comments on on the blacklisting PR. |
fyi, this is somewhat related to #17113 I need to refresh my memory on all the interactions and I'll get back. |
@tgravescs - I agree this might cause additional work in situations where shuffle fetch is transient like you mentioned above. But in those cases, IMO, users should tune the shuffle retry configurations to make sure we do not see any fetch failure. Current model assumes file lost in case of fetch failure and in case external shuffle service is enabled, we should clean all the files on a host as opposed to cleaning only for a specific executor. @kayousterhout - Filed a JIRA for the flaky build - SPARK-19803 |
In this particular case are your map tasks fast or slow. If they are really fast rerunning everything now makes sense, if each of those took 1 hour+ to run, failing all when they don't need to be just wastes time and resource. Rolling upgrades can take longer then 15 seconds to restart NMs. You can have intermittent issues that last > 1 minute. If it took 1 hour to generate that output I want it to retry really hard before failing all of those. Users aren't going to tune each individual job unless they really have to and it might very per stage. Really it should use cost base analysis on how long those tasks ran but that gets more complicated. It is also possible that some reduce tasks have already fetched the data from those nodes and succeeded and you wouldn't have to rerun all tasks on that host. Due to the way Spark cancels stages on fetch failure whether the reduce tasks from the stage finish before the map can be rerun is very timing dependent. You could end up doing a lot more work then necessary so the question is whether that is ok compared to the cost of not doing this and allowing the application to take a bit longer. Note that if you rerun all the maps and they didn't need to be you might cause the app to take longer too. How often do you see issues with node managers going down for other then transient issues? Are you seeing jobs fail due to this or just take longer? |
Note alternatively we could change it to not fail on fetch failure. This would seem better to me since there is no reason to throw away all the work you have done but I'm sure that is a much bigger change. |
How often are you doing rolling upgrades? I really think in those cases we should be tuning the shuffle fetch configurations to allow for rolling upgrades. Even without this change, current model un-registers files in case of fetch failure, so you might be losing a lot of work already and in worst case you can still loose all the files present in a host.
I am not sure if I get your point here, but this will not rerun reduce tasks that are already fetched data from those nodes and succeeded. We are seeing this issue very frequently mainly because of node reboot. We are trying to scale Spark to cluster with thousands of machines and probability of seeing failures and reboots during long running jobs is very high.
We are seeing both. As @squito mentioned, we only give 4 chances for a stage to be retried so even one node reboot can trigger 4 retries and cause the job to fail. In case the job gets lucky, the job takes significantly longer than expected in case of fetch failure because of multiple retries of a stage and the way retries are handled by the scheduler is not elegant right now - It does not allow multiple attempts of a stage to run concurrently, which is a separate issue I will address in another PR. |
first, I think we should change the hard-coded limit of 4 stage retries. Its clear to me there is an important reason why users would want a higher limit, so lets make it a config. That is a very simple change. (That doesn't mean we shouldn't be changing something else as well.) As with #17113, though this is a big change, it seems to actually be more consistent for spark. Of course some failures are transient, but (as has already been pointed out) (a) even the existing behavior will make you do unnecessary work for transient failures and (b) this just slightly increases the amount of work that has to be repeated for those transient failures. I'm also wondering if there are other options, eg:
|
Test build #74694 has finished for PR 17088 at commit
|
Test build #74710 has finished for PR 17088 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm reviewing code for style & clarity, without making a judgement yet on whether this is the right change in behavior. I'd like to keep hearing more opinions on that, though I'm leaning more and more to thinking this is a good idea.
// TODO: This will be really slow if we keep accumulating shuffle map stages | ||
for ((shuffleId, stage) <- shuffleIdToMapStage) { | ||
logInfo("Shuffle files lost for executor: %s (epoch %d)".format(execId, currentEpoch)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log should be outside the for loop, like it was before
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, my bad, thanks for noticing.
@@ -1390,7 +1401,34 @@ class DAGScheduler( | |||
} | |||
} else { | |||
logDebug("Additional executor lost message for " + execId + | |||
"(epoch " + currentEpoch + ")") | |||
"(epoch " + currentEpoch + ")") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as long as you're updating this, can you change it to use interpolation instead, just for consistency?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
val firstRDD = new MyRDD(sc, 3, Nil) | ||
val firstShuffleDep = new ShuffleDependency(firstRDD, new HashPartitioner(2)) | ||
val firstShuffleId = firstShuffleDep.shuffleId | ||
val shuffleMapRdd = new MyRDD(sc, 3, List(firstShuffleDep)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the number of partitions here needs to match the with the number used in the partitioner from its dependencies. Same below.
I know it doesn't matter in this test, but it becomes hard to understand what is going on in these tests if they have inconsistencies like this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, it was confusing before. Changed accordingly.
private[scheduler] def removeExecutorAndUnregisterOutputOnHost( | ||
execId: String, | ||
host: String, | ||
maybeEpoch: Option[Long] = None) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add return type : Unit =
we didn't adopt that convention (nor did scala) till after a lot of this class was written, hence the inconsistency in this file, but new changes should follow this style.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
clearCacheLocs() | ||
} else { | ||
logDebug("Additional executor lost message for " + execId + | ||
"(epoch " + currentEpoch + ")") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for trying this refactoring, but I don't like the amount of repitition between these two helper methods now. Sorry this code was very confusing even before, and I haven't given constructive suggestions so far ... what do you think of this version?squito@3e33d5e
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made changes as suggested, thanks!
One thing which I noticed while making sense of what was going in the code (even before) -- IIRC, spark standalone is a bit of a special case. I think it used to be the case that to run multiple executors per node, you had to run multiple worker instances on the node. Eg., see mentions of SPARK_WORKER_INSTANCES here: http://spark.apache.org/docs/1.4.0/spark-standalone.html but though its not documented, I think you can in fact still use multiple worker instances per node: which means, that when we get the WorkerLost msg in spark standalone, we aren't really sure if all shuffle files on that host have been lost or not: But I think the consistent thing to do would be to assume that there is just one worker per node, as that is the latest recommended configuration, and go ahead and remove all shuffle files on the node if the external shuffle service is enabled. Which would mean that we'd want to change the handling of the |
One meta question here: why aren't we getting a SlaveLost message in this case? I'm asking since there's already code in #14931 to un-register shuffle service files when we get a SlaveLost message, and that seems like a more bulletproof way of handling the case where an entire slave goes down. |
@kayousterhout I don't think #14931 is really a complete answer to this. (a) we only get that from standalone mode, no other cluster managers (yarn does not notify applications of failures of any node in the cluster) (Aside: (c) made me think more about whether we should be removing shuffle data when we blacklist, both for executors and nodes ... I think the behavior will be correct either way, but similar tradeoffs about which situation to optimize for.) I think that #14931 is just a small optimization when possible, not a mechanism that can be relied upon. |
+1 on that. In our case, we are not seeing the SlaveLost message in most of the cases and even if we do, it is delayed and we received fetch failure before that. So, as @squito pointed out we can not rely on SlaveLost message all the time. |
Ok that makes sense. I wanted to make sure that there wasn't some bug in SlaveLost (which might lead to a simpler fix than this) but @squito's description makes it clear that there are a bunch of situations that SlaveLost can't handle correctly. |
Test build #74768 has finished for PR 17088 at commit
|
jenkins retest this please. |
Test build #74771 has finished for PR 17088 at commit
|
// assume all shuffle data on the node is bad. | ||
Some(bmAddress.host) | ||
} else { | ||
// Deregister shuffle data just for one executor (we don't have any |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: "Unregister" is used elsewhere (function names, etc.), not "deregister".
@@ -1389,8 +1423,7 @@ class DAGScheduler( | |||
clearCacheLocs() | |||
} | |||
} else { | |||
logDebug("Additional executor lost message for " + execId + | |||
"(epoch " + currentEpoch + ")") | |||
logDebug("Additional executor lost message for %s (epoch %d)".format(execId, currentEpoch)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: prefer string interpolation over format
.
case SlaveLost(_, true) => true | ||
val workerLost = reason match { | ||
case SlaveLost(_, true) => | ||
true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: prefer it without the line break for something this simple
val filesLost = workerLost || !env.blockManager.externalShuffleServiceEnabled | ||
removeExecutorAndUnregisterOutputs( | ||
execId = execId, | ||
fileLost = filesLost, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The fileLost
vs. filesLost
naming difference is a little confusing -- is the distinction even conveying a difference worth paying attention to?
Yes a transient fetch failure always causes some more work because you are going to re-run some map tasks, but the question comes down to how much work it is doing. For your b) you can't really say it only "slightly increases" the work because its going to be highly dependent up on the timing of when things finish, how long maps take, and how long the reducers take. Please correct me if I've missed something in the spark scheduler related to this.
This timing dependent completion seems very unpredictable (and perhaps a bug in the commit logic but I would have to look more). If your maps take a very long time and your reducers also take a long time then you don't really want to re-run the maps. Of course once it starts stage 1.1 for the reducers that didn't originally fail, if the nodemanager really is down, the new reducer task could fail to fetch data that the old ones had already received, so it would need to be re-run anyway even if the original reducer was still running and would have succeeded. Sorry if that doesn't make sense, much to complicated. There is also the case where one reducer fails and you are using a static # of executors. So you have 1 slots to rerun the maps. If you now say fail 3 maps instead of 1, its going to potentially take 3 times as long to rerun those maps. My point here is that I think its very dependent upon the conditions, could be faster, could be slower. The question is which happens more often. Generally we see more intermittent issues with nodemanager rather then them going down fully. If they are going down is it due to them crashing or is the entire node going down? If entire node going down are we handling that wrong or not as well as we could? If its the nodemanager crashing I would say you need to look and see why as that shouldn't happen very often. I'm not sure which one could be better. Due to the way spark schedules I'm ok with invalidating all as well, but think it would be better for us to fix this right, which to me means not throwing away the work of the first stage running reducers. If we need something more short term I think it would be better to wait for a at least a couple fetch failures or a % of reducers failed before invalidating all of it. |
As per my understanding of the code, that is not the case. Currently, the task scheduler does not allow running multiple concurrent of a particular stage (see - https://github.com/sitalkedia/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L172). So we wait till the stage 0.1 finishes and rerun the failed maps in another retry stage 0.2. This adds significant latency to the job run.
In our case, we see fetch failure because of multiple reasons like node reboot, disk going bad or any network issue. It's very difficult for the cluster manager to detect these kinds of issues and inform the driver.
I know its not ideal, but how about making this behavior configurable? i.e., only unregister all outputs on a host if the configuration is enabled otherwise leave the existing behavior? |
Test build #75184 has finished for PR 17088 at commit
|
Filed a JIRA SPARK-20091 to allow running multiple concurrent attempts of a stage. I will make this change as a part of another ongoing PR - #17297 |
// make sure our test setup is correct | ||
val initialMapStatus1 = mapOutputTracker.mapStatuses.get(0).get | ||
assert(initialMapStatus1.count(_ != null) === 3) | ||
assert(initialMapStatus1.map{_.location.executorId}.toSet === |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
map{..}
=> map(..)
// reset the test context with the right shuffle service config | ||
afterEach() | ||
val conf = new SparkConf() | ||
conf.set("spark.shuffle.service.enabled", "true") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add a another test with spark.shuffle.service.enabled = false
?
|
||
// reduce stage fails with a fetch failure from one host | ||
complete(taskSets(2), Seq( | ||
(FetchFailed(BlockManagerId("exec-hostA2", "hostA", 12345), firstShuffleId, 0, 0, "ignored"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems the FetchFailed
message should reference shuffleDep.shuffleId
instead of firstShuffleId
?
gentle ping @sitalkedia |
What changes were proposed in this pull request?
Currently, when we detect fetch failure, we only remove the shuffle files produced by the executor, while the host itself might be down and all the shuffle files are not accessible. In case we are running multiple executors on a host, any host going down currently results in multiple fetch failures and multiple retries of the stage, which is very inefficient. If we remove all the shuffle files on that host, on first fetch failure, we can rerun all the tasks on that host in a single stage retry.
How was this patch tested?
Unit testing and also ran a job on the cluster and made sure multiple retries are gone.